package 并发队列.阻塞队列;

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

/**
 * 使用阻塞队列模拟生产者与消费者
 * Created by huangyu on 2018/9/1 16:40
 */

class Producer implements Runnable {
	private BlockingQueue queue;
	private volatile boolean flag = true;
	private static AtomicInteger atomicInteger = new AtomicInteger();

	public Producer(BlockingQueue queue) {
		this.queue = queue;
	}

	@Override
	public void run() {
		try {
			System.out.println("生产线程启动");
			while (flag) {
				System.out.println("正在生产数据...");
				String data = atomicInteger.incrementAndGet() + "";
				// 将数据存入队列中
				boolean offer = queue.offer(data, 2, TimeUnit.SECONDS);
				if (offer) {
					System.out.println("生产者存入" + data + "到队列中,成功");
				} else {
					System.out.println("生产者存入" + data + "到队列中,失败");
				}
				Thread.sleep(1000);
			}
		} catch (Exception e) {

		} finally {
			System.out.println("#########生产者线程停止#########");
		}
	}

	public void stop() {
		this.flag = false;
	}
}


class Consumer implements Runnable {
	private BlockingQueue<String> queue;
	private volatile boolean flag = true;

	public Consumer(BlockingQueue<String> queue) {
		this.queue = queue;

	}

	@Override
	public void run() {
		System.out.println("消费线程启动");
		try {
			while (flag) {
				System.out.println("消费者,正在从队列中获取数据...");
				String data = queue.poll(2, TimeUnit.SECONDS);
				if (data != null) {
					System.out.println("消费者,拿到队列中的数据data:" + data);
					Thread.sleep(1000);
				} else {
					System.out.println("消费者,超过2秒未获取到数据:" + data);
					flag = false;
				}


			}
		} catch (Exception e) {
			e.printStackTrace();
		} finally {
			System.out.println("#########消费者线程停止#########");
		}

	}
}

public class ProducerAndComsumerByQueue {
	public static void main(String[] args) {
		BlockingQueue<String> queue = new LinkedBlockingQueue<String>(3);
		Producer producerThread1 = new Producer(queue);
//		Producer producerThread2 = new Producer(queue);
		Consumer consumerThread1 = new Consumer(queue);
		Thread p1 = new Thread(producerThread1);
//		Thread p2 = new Thread(producerThread2);
		Thread c1 = new Thread(consumerThread1);
		p1.start();
//		p2.start();
		c1.start();

		try {
			Thread.sleep(10 * 1000);
		} catch (InterruptedException e) {
			e.printStackTrace();
		}
		producerThread1.stop();
//		producerThread2.stop();
	}
}
